package com.markhsiu.minimq.broker.store.disk;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.message.Message;

/**
 *  topic存储信息
 * @author Mark Hsiu
 *
 */
public class TopicDiskQueue {
	
	private final Logger logger = LoggerFactory.getLogger(TopicDiskQueue.class);

	private String topic;
    private String fileDir;
    private TopicDiskIndex index;
    private TopicDiskBlock readBlock;
    private TopicDiskBlock writeBlock;
    private ReentrantLock readLock;
    private ReentrantLock writeLock;
    private AtomicInteger size;
    
    public TopicDiskQueue(String fileDir, String topic) {
        this.topic = topic;
        this.fileDir = fileDir;
        this.readLock = new ReentrantLock();
        this.writeLock = new ReentrantLock();
    
        this.index = new TopicDiskIndex(fileDir,topic);

        this.size = new AtomicInteger(index.getWriteCount() - index.getReadCount());
        this.writeBlock = new TopicDiskBlock(fileDir,DiskUtils.blockFileName(topic, index.getWriteNum()),index);
        if (index.getReadNum() == index.getWriteNum()) {
            this.readBlock = this.writeBlock.duplicate();
        } else {
        	this.readBlock = new TopicDiskBlock(fileDir,DiskUtils.blockFileName(topic, index.getReadCount()),index);
        }
    }
    


    public int size() {
        return this.size.get();
    }

    public boolean write(Message message)  {
		return offer(message.buildByte());
    }
    
    public Message read(){
		byte[] bytes = poll();
		if(bytes != null){
			return Message.restore(bytes);
		}
		return null;
    }

    public byte[] poll() {
        readLock.lock();
        try {
            if (readBlock.eof()) {
                rotateNextReadBlock();
            }
            byte[] bytes = readBlock.read();
            if (bytes != null) {
                size.decrementAndGet();
            }
            return bytes;
        } finally {
            readLock.unlock();
        }
    }
    
    
    public boolean offer(byte[] bytes) {
        if (ArrayUtils.isEmpty(bytes)) {
            return true;
        }
        writeLock.lock();
        try {
            if (!writeBlock.isSpaceAvailable(bytes.length)) {
                rotateNextWriteBlock();
            }
            writeBlock.write(bytes);
            size.incrementAndGet();
            return true;
        } finally {
            writeLock.unlock();
        }
    }
    
    public void sync() {
    	try {
    		index.sync();
		} catch (Exception e) {
			logger.error("sync to zk error", e);
		}
        // read block只读，不用同步
        writeBlock.sync();
    }

    public void close() {
        writeBlock.close();
        if (index.getReadNum() != index.getWriteNum()) {
            readBlock.close();
        }
        index.reset();
        index.close();
    }
    
    private void rotateNextWriteBlock() {
        int nextWriteBlockNum = index.getWriteNum() + 1;
        nextWriteBlockNum = (nextWriteBlockNum < 0) ? 0 : nextWriteBlockNum;
        writeBlock.putEOF();
        if (index.getReadNum() == index.getWriteNum()) {
            writeBlock.sync();
        } else {
            writeBlock.close();
        }
        writeBlock = new TopicDiskBlock(fileDir,DiskUtils.blockFileName(topic, nextWriteBlockNum),index );
        index.putWriteNum(nextWriteBlockNum);
        index.putWritePosition(0);
    }

    private void rotateNextReadBlock() {
        if (index.getReadNum() == index.getWriteNum()) {
            // 读缓存块的滑动必须发生在写缓存块滑动之后
            return;
        }
        int nextReadBlockNum = index.getReadNum() + 1;
        nextReadBlockNum = (nextReadBlockNum < 0) ? 0 : nextReadBlockNum;
        readBlock.close();
        if (nextReadBlockNum == index.getWriteNum()) {
            readBlock = writeBlock.duplicate();
        } else {
            readBlock = new TopicDiskBlock(fileDir,DiskUtils.blockFileName(topic, nextReadBlockNum),index);
        }
        index.putReadNum(nextReadBlockNum);
        index.putReadPosition(0);
        String blockPath = readBlock.getBlockFilePath();
        TopicDiskPool.toClear(blockPath);
    }
    
  
    @Override
    public String toString(){
    	return index.toString();
    }
 
}
